{% if params|length == 0 %}
private Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }} = create{{ name.upper_camel_case }}();
{% endif %}

private Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> create{{ name.upper_camel_case }}({% for param in params %}{{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
  {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request
      = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
          {%- for param in params %}
            {%- if param.type_info.is_primitive %}
          .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
            {%- elif param.type_info.is_repeated %}
          .addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name }}())::iterator)
            {%- else %}
          .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name }}())
            {%- endif %}
          {%- endfor %}
          .build();

  Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> flowable
    = Flowable.create(emitter -> {
      scheduler.scheduleDirect(() -> {
        stub.subscribe{{ name.upper_camel_case }}(request,
            new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {

          @Override
          public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
          {%- if has_result %}
          {{ plugin_name.upper_camel_case }}Result result = {{ plugin_name.upper_camel_case }}Result.translateFromRpc(value.get{{ plugin_name.upper_camel_case }}Result());

            switch (result.result) {
              case SUCCESS:
                emitter.onComplete();
                break;
              case NEXT:
                {%- if return_type.is_repeated %}
                    {%- if return_type.is_primitive %}
                emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
                    {%- else %}
                emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
                    {%- endif %}
                {%- else %}
                emitter.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
                {%- endif %}
                break;
              default:
                emitter.onError(new {{ plugin_name.upper_camel_case }}Exception(result.result, result.resultStr));
                break;
            }
          {%- else %}
            {%- if return_type.is_repeated %}
              {%- if return_type.is_primitive %}
          emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
              {%- else %}
          emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
              {%- endif %}
            {%- else %}
              {%- if return_type.is_primitive %}
          emitter.onNext(value.get{{ return_name.upper_camel_case }}());
              {%- else %}
          emitter.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
              {%- endif %}
            {%- endif %}
          {%- endif %}
          }

          @Override
          public void onError(Throwable t) {
            emitter.onError(t);
          }

          @Override
          public void onCompleted() {
            emitter.onComplete();
          }
        });
      });
    }, BackpressureStrategy.BUFFER);

  return flowable
  .replay(1)
  .autoConnect();
}

public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> get{{ name.upper_camel_case }}({% for param in params %}{{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
  {% if params|length > 0 %}
  return create{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %});
  {% else %}
  return {{ name.lower_camel_case }};
  {% endif %}
}
